/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.service.redis;

import com.chinamobile.cmss.lakehouse.common.dto.SqlQueryResultDto;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Component
@Slf4j
public class RedisOperateClient {

    public final String redisKeyPrefix = "cluster:running:";
    private final String redisKeyPattern = redisKeyPrefix + "*";

    @Autowired
    private RedisClient redisClient;
    private final Object clusterRunningTaskLock = new Object();

    public String generateRedisKey(String clusterName) {
        return redisKeyPrefix + clusterName;
    }

    public void updateRunningTask(Map<String, Long> tasks) {
        synchronized (clusterRunningTaskLock) {
            log.info("Running task info will change: {}", tasks);
            for (String key : tasks.keySet()) {
                String getValue = redisClient.get(key);
                long current = null == getValue ? 0L : Long.parseLong(getValue);
                redisClient.set(key, current + tasks.get(key));
            }
        }
    }

    public long updateRunningTask(String clusterName, long change) {
        String key = generateRedisKey(clusterName);
        synchronized (clusterRunningTaskLock) {
            log.info("Cluster: {} running task info will change: {}", clusterName, change);
            String getValue = redisClient.get(key);
            long current = null == getValue ? 0L : Long.parseLong(getValue);
            long result = current + change;
            redisClient.set(key, result);
            return result;
        }
    }

    public Map<String, Long> getRunningTask() {
        // get all running task
        List<String> keys = null;
        List<String> values = null;
        synchronized (clusterRunningTaskLock) {
            keys = new ArrayList<>(redisClient.keys(redisKeyPattern));
            if (!keys.isEmpty()) {
                values = redisClient.mget(keys.toArray(new String[keys.size()]));
            }
        }
        Map<String, Long> runningEachCluster = new HashMap<>();
        if (!keys.isEmpty()) {
            for (int i = 0; i < keys.size(); i++) {
                runningEachCluster.put(keys.get(i),
                    values.get(i) == null ? 0 : Long.parseLong(values.get(i)));
            }
        }
        return runningEachCluster;
    }

    public long getRunningTask(String clusterName) {
        String key = generateRedisKey(clusterName);
        synchronized (clusterRunningTaskLock) {
            String value = redisClient.get(key);
            return value == null ? 0 : Long.parseLong(value);
        }
    }

    /**
     * get table scheme metadata from redis
     *
     * @param jobId job id
     * @return
     */
    public SqlQueryResultDto readAsHashByJobPrefix(String jobId) {
        SqlQueryResultDto sqlQueryResultDto = new SqlQueryResultDto();

        synchronized (clusterRunningTaskLock) {
            List<List<String>> queryResult = new ArrayList<>();
            Set<String> keys = redisClient.keys(jobId + ":*");
            if (CollectionUtils.isEmpty(keys)) {
                return sqlQueryResultDto;
            }
            // get all not duplicate key
            Set<String> schema = new HashSet<String>();
            for (String key : keys) {
                schema.addAll(redisClient.hkeys(key));
            }
            for (String key : keys) {
                List<String> values = (List<String>) redisClient.getAllSetReulst(key, new HashSet<Object>(schema));
                // null convert to ""
                values = values.stream()
                    .map((value) -> value == null ? "" : value)
                    .collect(Collectors.toList());
                queryResult.add(values);
            }
            sqlQueryResultDto.setResultSet(queryResult);
            sqlQueryResultDto.setColumnList(new ArrayList<>(schema));
        }
        return sqlQueryResultDto;
    }

}
